Agenda

Retos de la Ciencia de Datos

Como vimos el semestre pasado en “Introducción a ciencia de datos”:

  1. La mayor parte del tiempo dedicado a hacer un análisis de datos exitoso -70% a 80%- se nos va en hacer preprocesamiento a los datos que queremos analizar. Los datos nunca están limpios, siempre son messy por lo que siempre tenemos que limpiarlos, acomodarlos, completarlos, etc. Para grandes sets de datos esto puede complicarse más al requerir métodos computacionales para siquiera ver qué preprocesamiento se les tiene que hacer a los datos, o bien también requerimos pasar mucho tiempo en el feature engineering para encontrar las mejores variables que nos brindarán información al modelo -y bajar la maldición de la dimensionalidad-

  2. La iteración es un proceso fundamental en ciencia de datos, el modelado y el análisis requiere de pasar muchas veces por el set de datos. Por ejemplo, los algoritmos de stochastic gradient descent y expectation maximization requieren de pasar n mil veces por los datos para llegar a converger. También cuando hacemos análisis exploratorio normalmente un query -el resultado del mismo- nos llevará a preguntarnos la siguiente pregunta -otro query-. Cuando construimos modelos pasamos muchas veces por diferentes modelos o incluso en uno mismo cambiando sus hiperparámetros para encontar el mejor modelo con los mejores parámetros -magic loop, multi-armed bandit algorithm-

  3. La tarea de un data scientist no termina cuando tenemos el mejor modelo, termina cuando lo ponemos como una aplicación de datos que está en el mundo real alterando la realidad para la que fue creado. Hacer que esta aplicación de datos se convierta en parte de un servicio productivo requiere -normalmente- de que sea re-construido periódicamente o hasta en tiempo real -como lo están viendo en su clase de métodos analíticos-.

Dada esta característica es muy importante diferenciar entre hacer analítica de laboratorio y analítica para producción, en la primera estamos enfocados en análisis exploratorio tanto de datos como de modelos, en la segunda nos concentramos en analítica operativa haciendo que lo que desarrolamos en la analítica de laboratorio sea empaquetada de forma en que pueda informar decisiones en el mundo real. La analítica de laboratorio sucede en lenguajes como R, Octave e incluso Python -MATLAB!!!-, mientras que la analítica para producción los algoritmos o pipelines se reescriben en Python (con patrones de ingeniería de software), Java o C++ para cumplir con los tiempos de respuesta.

Claramente sería mejor que el lenguaje ocupado en el análisis de laboratorio sea el mismo que para el análisis para producción optimizando tiempos y recursos para llegar más rápido a producción y modificar la realidad… sin embargo R es lento y carece de falta de integración a los lenguajes e infraestructura que normalmente se ocupa en producción, y C++ o Java son herramientas HORRIBLES para hacer análisis exploratorio sobre todo por que no son herramientas REPL -Read Evaluate Print Loop-

Introducción

Spark fue creado en el laboratorio AMPLab de la Universidad de Berkeley (2014), también forma parte del Apache Software Foundation y es considerado como la primer solución open source de procesamiento distribuido hecha para Data Scientists.

Es considerada como la evolución de MapReduce de Hadoop. MapReduce revolucionó el procesamiento de datos de gran escala al permitir procesamiento en paralelo y que al aumentar el tamaño de los data sets el aumento en recursos computacionales es casi lineal! … al aumentar el tamaño del data set podemos escalar horizontalmente para que los trabajos se completen en el mismo tiempo, es resilente a las fallas en hardware -replicación de datos-.

Spark también mantiene la escalabilidad lineal y la tolerancia a fallos, pero extiende MapReduce en 3 formas:

  1. En lugar de tener un formato rígido de map \(\rightarrow\) reduce, Spark ejecuta operaciones más generales a través de un DAG -Directed Acyclic Graph- la implicación más grande de este cambio es que mientras que MapReduce requiere de escribir constantemente al DFS seleccionado para poder pasar resultados intermedios del map(pers) al reducer(s), Spark solo pasa los resultados intermedios al siguiente “paso” en el pipeline.

¿Un árbol es un DAG?

  1. Spark complementa las mejoras que brinda la estructura de DAG con un conjunto de transformaciones que permiten al usuario expresar los procesamientos de manera más natural por lo que se pueden expresar pipelines complejos en pocas líneas de código

  2. Spark permite tener procesamiento en memoria a través de las abstraciones Dataset y DataFrame con las que es posible que podamos materializar cualquier punto de procesamiento de un pipeline en memoria por lo que si hay steps más adelante en el pipeline que ocupen estos datos no requieren de ser reprocesados o vueltos a cargar de disco!!!. Esta característica permite que Spark sea el framework seleccionado en algoritmos iterativos que requieren de pasar varias veces sobre un set de datos.

La razón más importante para seleccionar Spark por sobre otros frameworks es que resuelve varios de los retos de ciencia de datos mencionados anteriormente:

  • El cuello de botella más grande para hacer productos de datos no es el CPU, ni la memoria, ni el disco, ni la red sino la productividad analítica. Spark permite hacer el pipeline desde preprocesamiento hasta evaluación del modelo en un solo ambiente!, lo que acelera los tiempos de desarrollo
  • Spark tiene una serie de librerías que le aportan las características de un REPL
  • Los API de Spark proveen de funcionalidad para hacer transformaciones de datos ya sea en funciones estadísticas, machine learning o álgebra matricial para hacer ETLs
  • Es posible ocupar Spark utilizando Python, Java o Scala
  • La capacidad de Spark de mantener objetos en memoria lo hace ideal para hacer machine learning
  • Spark reduce el espacio entre analítica de laboratorio y analítica para producción

Spark está integrado a muchas de las herramientas del ecosistema de Hadoop:

  • Puede leer y escribir de la mayoría de los formatos propios de Hadoop: Avro y Parquet son los más importantes
  • Puede leer y escribir de bases de datos NoSQL como Cassandra, HBase
  • Tiene una librería para streaming -Spark Streaming- que puede ingerir de sistemas de streaminig como Kafka y Flume
  • Tiene una librería para SQL -SparkSQL- con la que puede interactuar con el metastore de Hive
  • Puede correr con YARN lo que permite compartir recursos del clúster dinámicamente

… Por cierto, Spark está desarrollado en Scala, por lo que la gente que desarrollo Spark sugiere fuertemente utilizar Scala…pero, puedes ocupar PySpark -un wraper de Python para Spark- (también puedes conectar R con Spark con la libería sparklyr)

Teoría de Spark

Ecosistema de Spark


Fuente: Spark overview

  • SparkSQL: La librería que le permite a Spark analizar, como dplyr (EDA)
  • Spark MLlib: La librería que le permite a Spark realizar machine learning, como scikitlearn
  • Spark GraphX: La librería que le permite a Spark hacer análisis y representación de grafos, como Neo4j y la librería de networkx de Python
  • Spark Streaming: La librería que le permite a Spark hacer análisis en streaming, como apache Kafka (aunque se prefiere Kafka por la latencia)

\(\rightarrow\) Nosotros veremos todos :)

Arquitectura de Spark


Fuente: Overview Spark

  • Cada aplicación de Spark está coordinada por el SparkContext también se le conoce como driver
  • El SparkContexT se conecta al cluster manager que puede ser de diferentes tipos: YARN, Mesos -otro clúster manager- , Kubernetes (la última version, 2.3.0) -centrado en infraestructura en contenedores- o un cluser standalone. El cluster manager como YARN es el que adminstra todos los recursos en el clúster
  • En los nodos del clúster se crean Executors que son procesos que correrán el procesamiento y que guardarán datos de la aplicación
  • El cluster manager será el responsable de enviar el código de la aplicación -en un JAR si se ocupó Scala o en Python si se ocupó pyspark- a los executors

Características:

  • Esta arquitectura no permite que se puedan compartir datos entre diferentes aplicaciones de Spark -diferentes SparkContext sin que tengan que escribir a un sistema de almacentamiento (DFS)
  • Spark es agnóstico a la tecnología ocupada como administrador de clúster!
  • El driver debe disponible para escuchar y aceptar conexiónes de los executor
  • Como el driver es el que registra trabajos en el cluster schedules debe correr cerca de los nodos trabajadores workers por lo que ambos se sugiere deben estar al alcance en la misma área local de red.

Spark programming model

Hacer un programa en Spark consiste a grandes rasgos de los siguientes 3 pasos:

  1. Definir un set de transformaciones en el set de datos de entrada -que normalmente se encuentra en algún sabor de DFS-
  2. Invocar acciones que hacen que la salida de las transformaciones hechas a los datos sea persistida o regresarlos a la memoria local
  3. Hacer procesamiento local de los resultados obtenidos de forma distribuida

Spark Shell

Es el REPL de Spark por default para Scala, aunque tambien existe un shell para Python :) -pyspark-. Esta consola nos permite definir funciones y manipular datos, es como R sin IDE.


  • :help para ver todos los comandos disponibles en el shell
  • :history permite buscar nombres de variables o funciones que han sido generadas anteriormente en la sesión
  • :paste permite insertar en la consola algo copiado del clipboard (cualquier cosa fuerta de Spark)

SparkContext y SparkSession

El objeto más importante dentro de Spark es Resilent Distributed Dataset (RDD), una abstracción que representa una colección de objetos que puede ser distribuido en varios nodos de un clúster. Un RDD representa una forma de describir los procesamientos que queremos realizar en nuestros datos como una secuencia de pasos independientes y pequeños. Hay dos maneras de crear un RDD:

  1. Utilizar el SparkContext para crear un RDD de una fuente externos
  2. Ejecutar una transformación en una o más RDD

\(\rightarrow\) Todo en Spark son acciones o transformaciones y solo las acciones hacen que el procesamiento distribuido se lleve a cabo -antes no!, Spark es lazy!-. Las transformaciones son las operaciones que realizamos a los datos para “modificarlos”: filtros, agregaciones, intersecciones, uniones, joins, etc. Transformations.

Una acción hace que todas las transformaciones definidas antes de la acción se ejecuten en el clúster*, algunos ejemplos de acciones son: count, collect, first, take, saveAs…, foreach, etc. Actions


En Spark un DataFrame es una abstracción construida arriba de un RDD no son semejantes a los dataframes de Python Pandas o a R principalmente porque un DataFrame en Spark representa data sets distribuidos en un clúster, no datos locales donde cada renglón está en la misma máquina -pequeña sutil diferencia-. Para trabajar con los DataFrames se ocupa el SparkSession, para trabajar directamente con los RDD se ocupa el SparkContext.


SparkSession permite tener acceso a las funciones de SQL de Spark y a trabajar directamente con el DataFrame, SparkContext permite trabajar directamente con el RDD y hacer paralelización explícita. Desde Spark 2.x trabajamos directamente con el SparkSession, aunque sigue siendo posible trabajar con el SparkContext y el RDD. Un SparkSession siempre contiene al menos un SparkContext.

Debido a que SparkSession es un wrapper al SparkContext, la única forma de acceder al SparkContext es a través del SparkSession

\(\rightarrow\) Ver en pyspark

Decíamos que las acciones generadas sobre los RDD son las que hacen que se procesen los datos en el clúster, sin embargo existen algunas acciones que hacen que las cosas se pasen del clúster al driver (recordándo la arquitectura de Spark, eso es 1 solo nodo!).

  • collect: Trae al driver “todo” el data set, así como en dplyr es peligroso hacer un collect() sin conocer la cantidad de datos que traeremos, también en Spark puede ser peligroso
  • first: Permite traer al driver la primer observación del Dataframe que encuentre, se ocupa como un glimpse de R pero solo de 1 observación (mucho menos costosa que un collect)
  • take: Permite traer \(n\) observaciones del dataframe al driver
  • show: Muestra las primeras 20 observaciones en el Dataframe

También existe una operación que es importante entender, Shuffle. Este tipo de operaciones hace que los datos se tengan que redistribuir en el clúster lo que implica copiar datos a través de los executors y máquinas. Esta operación es muy costosa y compleja porque involucra I/O a disco, serialización de datos, y I/O de red; por lo que deberemos evitar en la medida de lo posible ocupar operaciones que sabemos que generan shuffle en los datos, o por lo menos conocer cuáles operaciones generarán shuffle para tomarlas en cuenta en el performance de la aplicación.

Operaciones que generan shuffle: repartition, coalesce, todas las operaciones que tienen un ByKey en su nombre, cogroup y join

El shuffle de Spark es como el shuffle de MapReduce, se requiere de tareas map que organizarán los datos y tareas reduce que agregarán los datos, pero la forma de ejecutarlos es diferente que la de MapReduce. Internamente los resultados individuales de cada task map son almacenados en memoria hasta que ya no quepan, entonces son ordenados basados en la partición target y escritos en un solo archivo. En el lado del reduce los tasks leen los bloques que les corresponden.

Algunas operaciones son más costosas que otras porque requieren estructuras de datos más pesados en memoria que otros -reduceByKey y aggregateByKey-

El objetivo del shuffle consiste en mover datos con la misma llave a un solo executor para que ese executor realice una operación específica sobre él.


Fuente: Spark shuffle

Spark en AWS

Para ocupar Spark en AWS solo tenemos que cambiar la selección de qué tipo de aplicación crear a Spark en lugar de la CoreHadoop que antes ocupábamos, todo lo demás se mantiene igual.


Scala

Programación funcional: Paradigma de programación basada en el cálculo lambda y en el uso de funciones matemáticas donde se evita guardar el estado y datos que cambien. Por el contrario, la programación imperativa enfatiza los cambios de estado mediante la mutación de variables.

Scala (2003) se considera la evolución de Java, porque fue diseñado pensando específicamente en ser un mejor lenguaje tomando como base Java; sigue siendo un lenguaje orientado a objetos pero incluyendo las características de programación funcional, corre sobre la JVM.

Lengujes de programación funcional:

  • Lisp (puro)
  • Haskell (puro)
  • Clojure (híbrido)
  • Python (híbrido): listas comprehensivas y funciones lambda
  • R
  • Scala
  • Erlang

I/O Spark

Cargar datos a Spark

  • CSV: spark.read.csv("path/to/file/in/dfs")


  • Parquet: spark.read.parquet("/path/to/file/in/dfs")


  • Avro: Se requiere de cargar la librería de Avro de databricks (Avro support), bajar el spark-avro repo, seguir las indicaciones. Cuando cargues el shell hay que poner bin/pyspark --packages com.databricks:spark-avro_2.11:4.0.0

spark.read.format("com.databricks.spark.avro").load("/path/to/file/in/dfs/")


  • JSON: spark.read.json("/path/to/file/in/dfs")


  • JDBC: Se pueden jalar datos de tablas almacenadas en Hive! o bien de bases de datos directamente (hay que pasarle las propiedades del conector jdbc a ocupar, usuario, password, etc). Al igual que con el jar de Avro es necesario enviarle al shell el jar del driver jdbc que se va a ocupar: bin/pyspark --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar


Guardar datos de Spark

  • CSV: df.write.csv("/path/in/dfs")

  • Parquet: df.write.parquet("/path/in/dfs")

  • Avro: df.write.format("com.databricks.spark.avro").save("/path/in/dfs")

  • JSON: df.write.json("/path/in/dfs")

SparkSQL

Funciones para el Dataframe

  • count(): Obtiene el número de observaciones que tiene el DataFrame
  • columns: Regresa todas las columnas de un Dataframe
  • describe(): Obtiene la descripción del Dataframe o de una columna
  • select(): Permite seleccionar uno o más “columnas” en el Dataframe
  • distinct(): Regresa un nuevo Dataframe que contiene observaciones únicas en el Dataframe al que se le aplica el distinct
  • drop(): Regresa un nuevo Dataframe que contiene las columnas que no fueron dropeadas
  • dropDuplicates(): Regresa un nuevo Dataframe con los renglones repetidos eliminados (una sola observación se queda)
  • dropna: Regresa un nuevo Dataframe eliminando observaciones que tengan valores nulos
  • fillna: Reemplaza los valores nulos con valores que tu defines por columna
  • filter: Como el filter de dplyr, permite filtrar las observaciones de un Dataframe que cumplen cierta(s) condición(s) lógica
  • foreach(f): Aplica la función \(f\) a cada observación en el Dataframe, es como el sapply de dplyr
  • groupBy: Agrupa los elementos de un Dataframe bajo cierto criterio para que después se puedan hacer operaciones de agregación sobre estos grupos
  • intersect: Regresa un nuevo Dataframe con las observaciones que coinciden en ambos Dataframes df.intersect(df2)
  • join: Hace un join entre dos Dataframes sobre alguna(s) columna(s). Hay innerr, outerr, full, left y right
  • limit: Limita el resultado que se regresa al driver
  • orderBy: Regresa un nuevo Dataframe ordenando las observaciones por el criterio especificado
  • rdd: Regresa el Dataframe como un RDD
  • sort: Regresa un nuevo Dataframe ordenado por algún criterio(s)
  • sample: Escoge una fracción del subconjunto del Dataframe original, es posible ocupar una semilla (seed)
  • summary: Solo desde la versión 2.3. -en AWS no hay esa versión aún-
  • subtract: Regresa un nuevo Dataframe que contiene las observaciones de este Dataframe que no están en el otro Dataframe
  • stat: Se crea un objeto de tipo DataFrameStatFunctions a través del cual se pueden solicitar funciones estadísticas básicas: ApproxQuantile, corr, cov, avg, también se pueden ocupar window functions!!! :), etc.
  • toDF:
  • toJSON: Convierte el Dataframe de Spark a un RDD donde cada observación es un JSON
  • toPandas: Convierte el Dataframe de Spark en uno de Pandas \(\rightarrow\) ¿esto lo trae al drive?
  • union: Regresa un nuevo Dataframe que contiene la unión de 2 Dataframes
  • where: Es un alias de filter

Tarea 6

A entregar de manera individual el 20 marzo 2018 máximo 23:59:59 CST en tu carpeta alumnos/nombre_apellido/tarea_6/

Las mismas preguntas que hiciste para la tarea 5 hacerlas con Spark :)

Ejercicio 1. Con la base de datos de northwind que se encuentran en el dropbox:

  1. ¿Cuántos “jefes” hay en la tabla empleados? ¿Cuáles son estos jefes: número de empleado, nombre, apellido, título, fecha de nacimiento, fecha en que iniciaron en la empresa, ciudad y país? (atributo reportsto, ocupa explode en tu respuesta)
  2. ¿Quién es el segundo “mejor” empleado que más órdenes ha generado? (nombre, apellido, título, cuándo entró a la compañía, número de órdenes generadas, número de órdenes generadas por el mejor empleado (número 1))
  3. ¿Cuál es el delta de tiempo más grande entre una orden y otra?

Ejercicio 2. Con los archivos de vuelos, aeropuertos y aerolíneas que están en el dropbox

  1. ¿Qué aerolíneas (nombres) llegan al aeropuerto “Honolulu International Airport”?
  2. ¿En qué horario (hora del día, no importan los minutos) hay salidas del aeropuerto de San Francisco (“SFO”) a “Honolulu International Airport”?
  3. ¿Qué día de la semana y en qué aerolínea nos conviene viajar a “Honolulu International Airport” para tener el menor retraso posible?
  4. ¿Cuál es el aeropuerto con mayor tráfico de entrada?
  5. ¿Cuál es la aerolínea con mayor retraso de salida por día de la semana?
  6. ¿Cuál es la tercer aerolínea con menor retraso de salida los lunes (day of week = 2)?
  7. ¿Cuál es el aeropuerto origen que llega a la mayor cantidad de aeropuertos destino diferentes?

Tu clúster se debe llamar nombre_apellido (o puedes ocupar una imagen)

¿Qué se entrega?

  • RMD y html con los queries que ejecutaste para cada pregunta/inciso
  • Los archivos de salida de Spark después de correr los scripts (puedes correrlos desde Hue)
  • Imagen del summary de tu clúster

¿Qué se califica?

  • No entregó RMD y html: -2
  • No agregó imágenes: -2
  • No agregó el código de cada script: -4
  • No agregó archivos de salida del clúster (part-r-00000): -4
  • Ejercicio 1:
    • a: 1
    • b: 1
    • c: 1
  • Ejercicio 2:
    • a: 1
    • b: 1
    • c: 1
    • d: 1
    • e: 1
    • f: 1
    • g: 1

Total: 10

UDF

Debido a que estaremos ocupando un lenguaje de propósito general, las UDF en Spark realmente se ocupan más cuando quieres ocupar una función anónima sin tener que guardarla como una función normal de Python.

  • Recordando cómo es una lista comprehensiva en Python (3.5.2):

\(\rightarrow\) correr en consola

numbers = [1,2,3,4,5,6]
[n * 2 for n in numbers if n % 2 == 1]
  • Recordando cómo es una función anónima en Python (funciones lambda):

El siguiente snippet de Python utiliza la función de Python (3.5.2) sorted que requiere 2 parámetros como entrada: la “cosa” a ordenar, y la “llave” con la cuál se va a ordenar la “cosa”. La función lambda en este snippet está buscando en cada elemento del diccionario name_candidates_col el valor asociado a la llave distance.

name_candidates_col.append({'name':name[1], 'col':col, 'row':row, 'distance':distance})
...

sorted(name_candidates_col, key=lambda x: x['distance'], reverse=False)

Ahora si… un ejemplo de un UDF en Spark sería algo así:

import pyspark.sql.functions as f


#definicion de UDF
vowels = f.udf(lambda x: len(re.findall('(?i)([aeiou])', x))/len(x) if (x is not None and len(x) > 0) else 0)
...
#uso de esa UDF
def custom_regex(df, col_name):
  ...
  f.avg(vowels(df[col_name])).alias(col_name + "_avg_vowels"),
  ...

En este snippet estamos definiciendo una UDF llamada vowels que por dentro ocupa una función lambda que extrae las vocales de una observación, saca su longitud, la divide entre la longitud total de la observación -si y solo si la observacion tiene un valor! (longitud > 0 y not None), de otra manera regresa 0 como número de vocales-. Más adelante ocupamos esta función en la definición de otra función (esta vez sin UDF).

Zeppelin


Zeppelin (0.7.3) es un notebook multipropósito (piénsalo como un jupyter) que permite ocupar varios framewokrs y lenguajes en el notebook para realizar análisis de datos en un mismo ambiente, su motor por detrás es Spark, por lo que tiene todas las integraciones a los diferentes elementos del ecosistema de Spark.

A través de Zeppelin es posible ralizar ingesta de datos (de HDFS y S3), análisis de datos, visualización de datos y colaboración.


Para ocupar los diferentes intérpretes dentro de Zeppelin basta con agregar antes de cualquier cosa el nombre del engine que tiene que ocupar Zeppeling para “interpretar”. Por ejemplo, para ocupar el intérprete de python hay que poner %python también se puede poner un sabor específico de python como %python.conda.

Zeppelin tiene integrado matplotlib, panda, md, shell


  • Pyspark


  • Python


  • Shell
  • md

Spark ML

spark.ml es el módulo de machine learning de Spark, diseñado para realizar machine learning dentro de spark de manera escalable y “sencilla”.

Características

  • Tiene algoritmos de ML ya implementados con la modificaciones necesarias para aprovechar el ambiente en paralelo donde vive Spark: clasificación, regresión, agrupación, filtros colaborativos, etc.
  • Tiene implementaciones de funciones que ocupamos para hacer feature engineering: feature extraction, feature selections, transformaciones, reducción de dimensionalidad
  • Permite generar pipelines en Spark al brindar herramientas para construir pipelines, evaluación y tuneo de pipelines de ML
  • Tiene una libería de utilerías con algebra lineal, estadística, manejo de datos, etc.

¿Por qué hay un spark.mllib y un spark.ml?

En la primer versión de Spark no existía la abstracción de DataFrame -el wrapper de los RDD- y todos los algoritmos de ML desarrollados en Spark interactuaban directamente con el RDD, todas estas implementaciones se encuentran en el paquete spark.mllib -que ya está descontinuada-. Una vez que salió la versión 2 de Spark y con ella los nuevos objetos SparkSession y DataFrame los algoritmos de ML fueron modificados -algunos- para que solo tengan interacción con la abstracción DataFrame y con ello surgió la librería spark.ml que es la que utilizaremos nosotros. Aún no están todos los algoritmos de spark.mllib implementados en DataFrame, la paridad en desarrollo está esperado alcanzarse en la versión 2.2 (esa ya existe) y la implicación más grande de no ocupar la librería spark.mllib es que a partir de Spark 3.0 la librería será removida completamente de Spark para dejar lugar a solo la interacción con los DataFrames.

Anyway Para confundir más a la banda, el nombre oficial de la herramienta que ocupa Spark para ML se conoce como MLlib (╯°□°)╯︵ ┻━┻ aunque realmente se refieren a la librería spark.ml.

Spark ML API

Pipelines

El diseño de los pipelines de Spark está inspirado en los pipelines de scikit-learn -los vimos en la clase de Into to DS-. Un pipeline en Spark está formado por los siguientes elementos:

  • DataFrame: API que ocupa los DataFrame de SparkSQL para poder agregar otros tipos de datos que pueden ser útiles para ML -vector-

  • Transformer: Algoritmo que transforma un DataFrame en otro DataFrame, recuerda que los DataFrame en Spark envuelven a un RDD y un RDD no puede ser modificado!. Para hacer una transformación se ocupa el método transform(). Los casos en los que ocuparemos un transform pueden ser agregar una nueva columna -por ejemplo feature engineering-, o por ejemplo una vez que se ha pasado un modelo de aprendizaje poner la respuesta final del modelo como parte del DataFrame original -etiqueta, score-.

  • Estimator: Algoritmos que se aplican a un DataFrame para producir un Transformer. Los estimators son los que ocupan el método fit() para poder realizar un entrenamiento, el método fit recibe como parámetro un DataFrame y devuelve un modelo -que es un transformer-. Por ejemplo: Un algoritmo de regresión lineal es un estimator que tiene su método fit a través del cual entrena el algoritmo.

\[\text{fit} \rightarrow \text{transformer}\]

\(\rightarrow\) Es importante conocer que por cada instancia de un transformer o estimator se genera un ID a través del cuál es reconocido durante todo el pipeline

  • Pipeline: Secuencia de procesos/etapas generado por transformers y estimators para hacer un workflow de ML. Cada transformer/estimator es una etapa dentro de la secuencia del pipeline, cada paso se corre en el orden establecido y el DataFrame de entrada es transformado por cada paso, si el paso es un transformer entonces se le aplica el método transform y si el paso es un estimator se le aplica el método fit.


Por ejemplo: Si tuviéramos un texto al cuál quisieramos aplicarle un análisis de sentimiento, el pipeline podría consistir en los siguientes pasos:

  • Separar cada documento en palabras
  • Covertir cada palabra de cada documento en un vector numérico
  • Utilizando el vector numérico y las etiquetas asociadas -del sentimiento- ocupar un modelo de clasificación


* Fuente: Spark ML Guide

En Spark, un pipeline es un estimator por lo que puede hacer llamada al método fit, al hacer esto se genera un PipelineModel -que es un transformer-. Cuando querramos ocupar modelos entrenados para producción deberemos ocupar el PipelineModel generado en el momento de entranamiento al hacer una llamada a su método transform, de esta manera todos los estimators del pipeline original son convertidos a transformer asegurándonos de que en pruebas tendremos los mismos pasos/trasnformaciones ocupados para el entrenamiento del modelo. ╭(◔ ◡ ◔)/


*Fuente: Spark ML Guide

Un Pipeline en Spark está representado como un DAG, el ejemplo anterior es un DAG lineal, pero no necesariamente deben ser lineales, basta con que cumplan las características de ser un DAG -grafo acíclico dirigido-. Es por esta razón que cada instanciación de un transformer o estimator tiene asociado un ID y debe ser único, si necesitaramos un mismo transformer en el pipeline requerimos de generar otro transformer -aunque tenga el mismo código- :( (ya sé! esto medio que le da en la ma al principio de reuso pero … por el momento así se resuelve en Spark en pro de tener un pipeline), Spark revisa en tiempo de ejecución revisa que no se rompa “algo” antes de correr el pipeline -lazy-

  • Parameter: API con la que se pueden compartir parámetros entre Estimators y Transformers. Ocupamos el objeto Param que es un parámetro nombrado con documentación auto contenida en un ParamMap -diccionario de parámetro, valor-.

En Spark hay dos maneras de pasar parámetros a los algoritmos de ML:

  1. Configurar los parámetros fijos de los algoritmos a ocupar (setters)
  2. Pasar un ParamMap con los parámetros y sus valores a través de fit o transform, si se envían parámetros de esta manera se hace override a los específicados vía setters

Lo lindo de estos objetos es que cada definición dentro del ParamMap es “atado” a un estimator o transformer en específico -a través el ID antes mencionado-. Por ejemplo: si tuvieramos en un pipeline dos regresiones logísticas -lr1 y lr2- podríamos ocupar un ParamMap que establezca es valor de las iteraciones máximas de cada regresión:

#pyspark

ParamMap{lr1.maxIter: 10, lr2.maxIter: 20}

¿Para qué tomarse la molestia de hacer estructuras y procesos para que Spark pueda manejar pipelines de ML?

  • Recuerda que Spark está hecho como herramienta principal para un data scientist, pensado y diseñado tomando en cuenta su usuario principal
  • ¿Recuerdas el hyperparameter tuning, magic loop, multi-armed bandit?

╭(◔ ◡ ◔)/

  • I/O de pipeline o modelos

Desde Spark 1.6 se agregó la posibilidad de guardar modelos y pipelines implementados en Spark para poder ocuparlos después, pero no todos los algoritmos de spark.ml tienen esta posibilidad -tiene que ver con la paridad que mencionamos al principio de la libería spark.mllib y la spark.ml-, por lo que se requiere revisar la documentación específica de cada algoritmo y ver si se puede y cómo. ¯\(ツ)

El paquete spark.ml.util tiene los objetos MLWriter y MLReader que permiten guardar y cargar modelos sin importar el lenguaje en el que se hayan implementado -Scala, Java, Python o R (implementaciones especificamente para Spark!)-

Pipeline: Spark ML API

  • En Spark 2.1 existe el método save(path) para poder guardar un pipeline en el path indicado, es un shortcut a write.save(path)
  • En Spark 2.1 existe el método load(path) para poder cargar un pipeline, es un shortcut a read.load(path)

Modelos:

Los modelos que tengan posibilidad de ser guardados tendrán los métodos de save(path) y load(path) (verificar documentación del API para el modelo ocupado)

Ejemplos:
  1. Veremos un pequeño ejemplo de cómo ocupar los estimators y tranformers (viene en la documentación de Spark), NOTA: Los comentarios en el código no tienen acentos
#pyspark 
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

# En Spark se puede crear un DataFrame de un RDD, de una lista o de un DataFrame de Pandas, 
# aquí lo estamos creando con una lista que contiene tuplas de (label, features)
# tal cual lo hacíamos en sklearn, y le estamos agregando los nombres de cada columna.
# Vectors.dense recibe una lista como parámetro
# Este DataFrame lo estamos ocupando como nuestro set de entrenamiento mock!
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# Como lo hacíamos en sklearn, primero instanciamos el modelo que quremos
# ocupar con los parámetros que nosotros queremos tener para este 
# modelo en particular --configuramos el modelo--
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Veamos la documentacion del modelo y que parametros le pusimos a nuestra
# configuracion
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Ocupemos el modelo que configuramos para entrenar con lo datos que
# creamos en el DataFrame training
model_1 = lr.fit(training)

# model_1 es un transfomer creado a traves de un estimador (LogisticRegression)
print("Model 1 was fit using parameters: ")
# aqui estamos obteniendo la configuracion con la que se entreno
# la regresion logistica que ocupamos
print(lr.extractParamMap())


# Tambien podemos especificar los parametros con los que queremos que 
# corra el modelo utilizando el diccionario de ParamMap
# Creamos un diccionario -se puede llamar como quieras!- que tenga
# como llave el nombre del parametro que quieres modificar, con el valor
# correspondiente.
param_map = {lr.maxIter: 20}
# Si el valor ya existe en el diccionario puedes actualizarlo
param_map[lr.maxIter] = 30  
# Tambien puedes actualizar varios parametros del diccionario al mismo tiempo
param_map.update({lr.regParam: 0.1, lr.threshold: 0.55}) 

# Se pueden combinar diferentes diccionarios...realmente puedes tener
# un solo diccionario con los parametros de diversos modelos que ocupes en el
# pipeline sin ningun problema, pues el valor asociado es por objeto (ID)
# aqui estamos cambiando el nombre de la columna que guarda la salida del
# modelo, por default se llama 'probability' -> verificar documentacion del 
# metodo
param_map_2 = {lr.probabilityCol: "my_probability"}  
param_map_combined = param_map.copy()
param_map_combined.update(param_map_2)
#puedes ver el contenido del diccionario con param_map_combined.items() -> python 3.5.2

# Entrenemos una segunda regresion logistica con los nuevos parametros que 
# establecimos a traves del paramMap
# En este fit estamos enviando tanto los datos como los parametros a ocupar en el
# modelo de regresion logistica
model_2 = lr.fit(training, param_map_combined)
print("Model 2 was fit using parameters: ")
# aqui queremos ver cono que parametros se quedo configurado el modelo
# que ocupamos para entrenar
print(lr.extractParamMap())

# Creemos el data frame que tendra los datos de prueba mock!
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
# Ahora si, hagamos predicciones sobre el set de pruebas ocupando el cerebro
# antes entrenado utilizando el transform
prediction = model_2.transform(test)
# la respuesta es un DataFrame (la salida de un transform en su DataFrame)
# por lo que podemos aplicarle los metodos de SparkSQL :)
# verificamos que si es un DataFrame...
type(prediction)
# veamos que columnas tiene este DataFrame (como el names de R)
prediction.columns
# Aqui estamos seleccionando las columnas features, label, 
# my_probability -> que es el nombre que nosotros especificamos anteriormente en
# ParamMap, y la columna prediction que es el nombre por default que regresa
# el modelo al parametro 'predictionCol' -> ver documentacion
# el collect hara que se regresen los resultados al drive!!! 
result = prediction.select("features", "label", "my_probability", "prediction") \
    .collect()


for row in result:
    print("features={}, label={} -> prob={}, prediction={}".format( \
    row.features, row.label, row.my_probability, row.prediction))


Pasaron demasiadas chivas como para quedarse quietesitos… ¡Hagámoslo! = Hazlo :P

¿Qué demonios pasó aquí?


  1. Ahora hagamos el ejemplo de texto para ver como se hacen los pipeline en Spark (viene en la documentación de Spark), NOTA: Los comentarios en el código no tienen acentos:
#pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
    
# al igual que en el ejemplo anterior, creamos un dataframe a traves
# de una lista con los datos de entrenamiento, la lista esta formada
# por tuplas (id, texto, label). Esta forma no nos servirá para poder meterla
# en los objetos de ML, pero mas adelante arreglaremos esto
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])


# Definimos los transformers: Tokenizer y HashingTF, y los 
# etimators: LogisticRegression que ocuparemos. Nota que aqui no hemos hecho
# ningun fit todavia... la magia vendra mas adelante ;)
# Tokenizer convierte el string de entrada (inputCol) a minusculas y separa en
# palabras utilizando ocmo separador el espacio
tokenizer = Tokenizer(inputCol="text", outputCol="words")
# HashingTF permite hashear cada palabra utilizando MurmurHash3 convirtiendo
# el hash generado en el indice a poner en el "TDM". Este metodo optimiza el
# tiempo para generar el TDM de TF-IDF "normal". Para evitar colisiones en
# la conversion a hash se aumenta el numero de bucket -se recomienda ocupar
# potencias de 2 para balancear las cubetas-
# Nota que en este transformet estamos ocupando como entrada la salida del
# transformer Tokenizer
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
# Ocuparemos una regresion logistica de nuex
lr = LogisticRegression(maxIter=10, regParam=0.001)
# Aqui viene lo bonito... definimos un pipeline que tiene como etapas/pasos
# primero el tokenizer, luego el hasing y luego la regresion logistica. Aqui
# estamos definiendo el flujo de procesamiento, el DAG! 
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])


# Voila, solo se requiere de hacer fit al pipeline para que esto funcione
# como un pipeline, siguiendo el orden de los pasos establecidos en la 
# definicion del pipeline :) ... recuerda que el fit solo es como 
# el entrenamiento una vez que ya definimos las configuraciones de 
# los objetos que ocuparemos (transformers y estimators)
model = pipeline.fit(training)

# Creamos el dataframe de pruebas mock! -> Nota que aqui no hay 
# label!!!! (asi funcionaria en produccion cierto!)
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])


# Lixto, "ejecutamos" el pipeline haciendo un transform al pipeline para 
# obtener las predicciones del set de pruebas
prediction = model.transform(test)

# De nuevo, prediction es un DataFrame generado con un transformer generado
# a traves de estimadores y transformers :) 
# Seleccionamos las columnas que queremos ver 
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("({}, {}) --> prob={}, prediction={}".format( \
    rid, text, str(prob), prediction))



Feature engineering

En este módulo de la librería de ML (pyspark.ml.feature) se encuentran las funciones asociadas a las siguientes acciones:

  • Feature extraction: Extraer características -features- de datos crudos -raw-
  • Feature transformation: Escalar, convertir o modificar características -features-
  • Feature selection: Seleccionar un subconjunto de variables/características -feataures- de un conjunto más amplio
  • Locality Sensitive Hashing (LSH): Algoritmos que se ocupan para obtener feature transformation

En el ejemplo del texto ocupamos métodos de feature transformantio: Tokenizer, y de LSH: HashingTF. En esta parte veremos ejemplos de los métodos más utilizados en cada una de las categorías mencionadas, esta parte para nada es exhaustiva pues Spark cuenta con muchos métodos implementados, solo es para que se den una idea de cómo se ocupan en Spark. (Spark ML feature API)


*Fuente: Spark ML Guide

  1. Feature extraction
  1. TF-IDF

Spark TF-IDF

Solo para recordar, TF-IDF es un algoritmo de minería de texto ocupado normalmente en problemas de IR a través del cual contando la frecuencia de aparición de una palabra en todo la colección de documentos y en la frecuencia dentro de cada documento se establece la relevancia de un documento dado un query de búsqueda.

\[tf\_idf=tf \cdot log_{10}\frac{N}{df}\]

En este ejemplo, TF-IDF se ocupa como una transformación a una variable raw -las palabras- para se ocupadas como un feature en otro algoritmo de aprendizaje de máquina.

#pyspark
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer

# Creamos nuestro set de entrada para formar la TDM
sentence_data = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

# Ocupamos el transformer Tokenizer para separar por palabras
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# Aqui no hay train! porque no estamos entrenando nanda... estamos en un problema
# de IR. Tokenizer no tiene un metodo fit -no hay entrenamiento-
words_data = tokenizer.transform(sentence_data)

# Ocupamos el transformer CountVectorizer para generar una matriz de 
# terminos y sus frecuencias 
count_vectorizer = CountVectorizer(inputCol="words", outputCol="raw_features")
featurized_model = count_vectorizer.fit(words_data)
featurized_data = featurized_model.transform(words_data)
featurized_data.show(truncate=False)

# Ocupamos IDF para obtener el IDF de la coleccion de documentos mock que 
# generamos. IDF si tiene un metodo fit a traves del cual le enviamos el set 
# de tokens al que queremos obtener el IDF
idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=1)
# Aqui obtenemos el modelo a ocupar (transformer) a ocupar 
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

rescaled_data.select("label", "features").show(truncate=False)



  1. Feature transformation
  1. OneHotEncoder

Solo para recordar, one hot encoding transforma una variable categórica de \(n\) categorías a \(n\) variables binarias, normalmente ocupamos esta transformación para ocupar variables categóricas en algoritmos que solo ocupan representaciones numéricas -normalmente aquellos algoritmos que ocupand distancias-

#pyspark
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# creamos nuestro set de datos de entrada categorico
df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

# Esta funcion agrega un id numerico a cada valor diferente de un valor categorico 
# es como establecer los niveles en R de una factor pero los niveles son numericos,
# sus id. El indice se establece por orden de frecuencia (descendente), por lo que 
# el indice 0 corresponde a la variable que aparece con mas frecuencia
string_indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = string_indexer.fit(df)
indexed = model.transform(df)
indexed.show()

# OneHotEncoder no tiene un fit ya que solo es un transformador
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()




  1. MinMaxScaler

Es la normalización de minería de datos \(\frac{x-min}{max-min}\)

#pyspark
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

data_frame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
data_frame.show()

# Configuramos el estimator MinMaxScaler como lo necesitamos
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# Creamos el modelo MinMaxScaler (transformer)
scaler_model = scaler.fit(data_frame)

# Transformamos los datos reescalando 
scaled_data = scaler_model.transform(data_frame)
# Nota que cuando pedimos getMin y getMax lo hacemos al estimator, no al modelo
print("Features scaled to range: [{}, {}]".format(scaler.getMin(), scaler.getMax()))
scaled_data.select("features", "scaled_features").show(truncate=False)



  1. StandardScaler

Corresponde a la estandarización en minería de datos \(\frac{x-\mu}{\sigma}\)

#pyspark
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler

# Creamos el data frame que queremos estandarizar
data_frame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
# Configuramos el estimator StandarScaler como lo necesitamos (por default
# withMean esta en False porque hace que se regrese un vector dense...
# hay que tener cuidado con eso cuando estemos manejandoo vectores sparse
scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=True)
# Creamos el modelo StandardScaler para los datos de entrada
scaler_model = scaler.fit(data_frame)

# Transformamos los datos 
scaled_data = scaler_model.transform(data_frame)
scaled_data.show(truncate=False)



s

  1. Feature selection

\(\rightarrow\) Además de los métodos que contiene este submódulo, los algoritmos más ocupados para la selección de variables se encuentran dentro del de Feature transformation: PCA, o bien son algoritmos dentro de spark.ml: DecisionTrees, RandomForests.

  1. ChiSqSelector

Este método se ocupa para variables categóricas, se realiza una prueba de independencia \(\chi^2\) (\(\chi^2\) test) para seleccionar los mejores features. Hay 3 tipos de selección que se pueden hacer:

  • numTopFeatures selecciona los top n features de acuerdo a la \(\chi^2\) test, es como seleccionar los features que mayor información predictiva tienen
  • percentile similar a numTopFeatures pero seleccionando un porcentaje en lugar de un numero fijo
  • fpr selecciona aquellos features cuyo p-value sea menor a un cierto threshold controlando los falsos positivos
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

# creamos nuestro set de datos con features
df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

# Configuramos el estimator
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")
# Creamos el modelo de ChiSquare y luego ocupamos el transform para 
# seleccionar los features con mas informacion
result = selector.fit(df).transform(df)

print("ChiSqSelector output with top {} features selected".format( \
selector.getNumTopFeatures()))
result.show()



  1. PCA

En el siguiente ejemplo aplicaremos PCA a un data set de 5 variables -dimensiones- para dejarlo en uno de 3, tomando las primeras 3 componentes principales.

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

# Creamos el data frame de 5 dimensiones, nota que en este data frame estamos
# ocupando un vector de tipo sparse en donde solo ponemos la dimension del vector
# los indices donde los valores son diferentes de 0 y los valores de esos indices
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

# Configuramos el modelo 
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
# Creamos el estimator PCA
model = pca.fit(df)
# Transformamos a PCA los valores quedandonos con los primeros 3 componentes
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)



Clasificación y regresión

  1. RandomForest

Cargaremos un dataset de prueba que se encuentra en formato libsvm, en este formato cada línea tiene un vector sparse con etiquetas. En un vector sparse se guarda la etiqueta seguida de los índices que tienen un valor diferente de 0 y el valor asociado label index1:value1 index2:value2 .... Por ejemplo:


from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Cargamos los datos de prueba
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")

# Indexamos las categorias que existen de la columna label
# y transformamos el estimator a modelo (transformer)
label_indexer = StringIndexer(inputCol="label", outputCol="indexed_label").fit(data)

# El VectorIndexer es un estimator que identifica que variables son categoricas
# definiendo como categoricas aquellas que tienen menos de 5 valores diferentes
# si la variable tiene mas de 5 valores diferentes se tomara como variable continua
feature_indexer =\
    VectorIndexer(inputCol="features", outputCol="indexed_features", maxCategories=4).fit(data)

# separamos los datos en entrenamiento y pruebas (70 y 30)
(training_data, test_data) = data.randomSplit([0.7, 0.3])

# configuramos el modelo de RandomForest con los parametros que queremos
# normalmente el numero de arboles, numero de elementos minimo que un nodo puede 
# tener para ser dividio (nodos hoja), algoritmo de impureza a ocupar: gini o entropia
# a diferencia de R y scikitlearn, aqui no se puede especificar el numero de 
# variables a tomar en cuenta por cada arbol, spark toma el default (sqrt(n))
rf = RandomForestClassifier(labelCol="indexed_label", featuresCol="indexed_features", numTrees=10)

# Convertimos las etiquetas indexadas a sus valores originales
label_converter = IndexToString(inputCol="prediction", outputCol="predicted_label",
                               labels=label_indexer.labels)

# Hacemos el pieline!! :) definiendo los pasos y su orden
pipeline = Pipeline(stages=[label_indexer, feature_indexer, rf, label_converter])

# Entrenamos :) con los datos de prueba
model = pipeline.fit(training_data)

# Modificamos el estimator a transformer para hacer predicciones con los datos de prueba
# ocupando el cerebro entrenado en fit
predictions = model.transform(test_data)

# Seleccionamos las columnas de nuestro interes (los 5 primeros registros)
predictions.select("predicted_label", "label", "features").show(5)

# Ahora evaluaremos el desempenio del modelo, para ello ocupamos el 
# modelo MulticlassClassificationEvaluator que se encuentra en el modulo 
# pysaprk.ml.evaluation.  Esta funcion permite evaluar el desempenio de un 
# modelo de ML, le pasamos la columna que tiene las etiquetas (originales) 
# y la etiqueta predicha por el modelo en metricName le especificamos la medida 
# de desepenio en la que queremos poner atencion -> recuerda el pequenio 
# detalle de solo mirar accuracy y no precision, recall (o f1 score en el 
# mejor de los casos). Por el momento solo es posible obtener precision, recall y
# accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexed_label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = {:f}".format(1.0 - accuracy))

rfModel = model.stages
print(rfModel)  # summary only




Agrupamiento

  1. K-Means
#pyspark
from pyspark.ml.clustering import KMeans

# Creamos el data frame con los datos que ocuparemos mock
dataset = spark.createDataFrame([
    (0, Vectors.dense([0.0, 0.0, 0])),
    (1, Vectors.dense([0.1, 0.1, 0.1])),
    (2, Vectors.dense([0.2, 0.2, 0.2])),
    (3, Vectors.dense([9, 9, 9])),
    (4, Vectors.dense([9.1, 9.1, 9.1])),
    (5, Vectors.dense([9.2, 9.2, 9.2]))], ["label", "features"])

# Configuramos  el KMeans con 2 grupos
kmeans = KMeans().setK(2).setSeed(1)
# Creamos el estimator Kmeans 
model = kmeans.fit(dataset)

# Vemos que tan bien o mal nos fue, obteniendo el SSE de los puntos a
# su centro
wssse = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(wssse))

# Obtenemos centros 
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)



Selección de modelos y tuneo

En Spark es posible hacer el tuneo de hiperparámetros de un modelo específico -por ejemplo RandomForestClassifier- o si este modelo forma parte de un pipeline se puede hacer tuneo del pipeline completo :)

Hyperparameter tuning

El semestre pasado hablábamos de un grid de parámetros de búsqueda que queríamos que se probaran de manera automática, en Spark este grid se puede especificar en la función ParamGridBuilder.

Para hacer selección de modelos en Spark se requiere de los siguientes elementos:

  • El Estimator que se desea tunear -ya sea un algoritmo específico o un Pipeline completo, recordemos que Pipeline también es un Estimator-
  • El conjunto de parámetros que se quieren probar -el parameter grid-
  • La métrica a evaluar para medir el desempeño del modelo -el evaluator: RegressionEvaluator, BinaryClassificationEvaluator o MulticlassClassificationEvaluator-

El proceso general de selección de modelos es el siguiente:

  1. Dividir los datos en set de entrenamiento y pruebas
  2. Para cada par de entrenamiento/pruebas iterar sobre un conjunto de parámetros -ParamMap-
    • Entrenar el estimador seleccionado (fit) con los parámetros, obtener el modelo entrenado, evaluar el desempeño del modelo utilizando el Evaluator
  3. Seleccionar el modelo con el mejor desempeño
  1. Cross-Validation = K-fold cross validation

En este ejemplo ocuparemos un ParamGrid para poder hace evaluación de varios modelos, ocuparemos el ejemplo de texto que habíamos ocupado antes para ejemplificar el uso de pipelines en Spark. Probaremos con 3 valores diferentes para la parte de HashingTF -3 diferentes números de cubetas-, 2 valores diferentes para la regresión logística y ocuparemos un \(k\) de 2 para el cross validation. Estos cambios nos llevarán a tener \((3*2)*2=12\) modelos diferetnes. Hacer selección de parámetros a través del grid es normalmente muy costoso pues requerimos de explorar todas las opciones que explícitamente ponemos en el mismo, existen otros algoritmos para hacer el tuneo de hiperparámetros, puedes encontrar las ligas en la sección de referencias (Hyperparameter tuning)

#pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Creamos un DataFrame con ids, texto, label
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configuramos los elementos que formaran parte de nuestro pipeline: Tokenizer, HashingTF,
# regresion logística y pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Definimos uno de los 3 elementos que necesitamos para hacer hpyperparameter tuning
# el grid de parametros.
# Especificamos el grid que queremos explorar agregando los parametros
# que queremos modificar de cada estimator y poniendo los valores que 
# queremos explorar para cada uno de ellos en una lista. 
# En este grid estamos estableciendo que queremos explorar tener 10
# cubetas, 100 y 1000 (siempre ocupar valores pares para que las cubetas
# queden balanceadas); para la regresion logistica ocupamos dos parametros de 
# regularizacion
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

# Definimos el estimador que queremos evaluar -recuerda que pipeline es un estimator-
# y el evaluador que ocuparemos para determinar el desempenio de cada modelo
# por default el BinaryClassificationEvaluator toma como metrica de desempenio 
# el AUC -metricName-, solo hay AUC o area under precidion recall curve
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # ocupar minimo 3 - 10 normalmente!

# Cross validation tambien es un estimador, le hacemos fit
cvModel = crossval.fit(training)
# veamos cual fue el mejor modelo
cvModel.bestModel

# Creamos nuestro dataframe de pruebas
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Hacemos predicciones con el mejor modelos encontrado por el CrossValidator que es el
# que quedo en cvModel
prediction = cvModel.transform(test)
# seleccionamos las columnas que queremos ver y las imprimimos para ver el resultado
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)



Tarea 7

A entregar de manera individual máximo el 24 de abril de 2018 23:59:59 CST (-0.5 por cada día de retraso) en tu carpeta alumnos/nombre_apellido/tarea_7

Con los datos que tenemos de flights queremos predecir el tiempo de retraso de salida DEPARTURE_DELAY

  • Si requieres de hacer transformaciones a los datos, feature selection y/o feature engineering tendrás que hacerlos como parte de un pipeline
  • Deberás dividir el set en entrenamiento y pruebas (70 y 30)
  • Deberás ocupar 10 como valor de k en cross validation
  • Deberás seleccionar 2 algoritmos de tu preferencia para realizar la predicción
  • Necesitas generar un gridParamMap para modificar los parámetros de los algoritmos que seleccionaste, al menos deberás de tener 3 valores diferentes en 2 de los parámetros. Por ejemplo: si seleccionaste un random forest puedes modificar el número de árboles y el número de elementos mínimo para no seguir dividiendo (con 3 valores diferentes)
  • Necesitas generar un magic loop para probar probar tus dos diferentes algoritmos
  • Deberás seleccionar los mejores parámetros por algoritmo a través de un evaluador (como lo vimos en el ejemplo)
  • ¿Qué parametros resultaron mejor por algoritmo? (ver el método bestModel)
  • ¿Qué algoritmo resultó el mejor, con qué parámetros?
  • Indicar el tiempo de ejecución de la función de magic loop (ocupa python directo con la librería timeit)

¿Qué se entrega?

  • El script de pyspark (con comentarios en tu código)
  • El diagrama de elementos de tu código (transformadores, estimadores, data frames, pipeline, etc)
  • Foto del summary de tu clúster
  • Foto del step en verde de tu script
  • El zeppelin (si lo hiciste en zeppelin), jupyter (si lo hiciste en jupyter)
  • Un Rmd y html que incluye el diagrama de elementos y las respuestas a las 2 últimas preguntas
  • Tiempo de ejecución del magic loop (punto anterior)

¿Qué se califica?

  • No entrego Rmd y html: -2
  • No entregó imágenes: -2
  • No entregó script de pyspark: -3
  • Diagrama de elementos del proceso de ML en Spark: 2
  • Cross-validation: 0.5
  • 2 algoritmos: 0.5
  • GridParam: 1
  • MagicLoop: 3
  • Evaluador para selección de parámetros: 0.5
  • Pipeline: 1
  • Mejores parámetros: 0.5
  • Mejor algoritmo con mejores parámetros: 0.5
  • Tiempo de ejecución: 0.5

Total: 10

Spark Streaming

El módulo de Spark que permite procesar datos en live-data stream, al ser parte de spark comparte sus cualidades de ser escalable, con un throughput alto y con tolerancia a fallos en los streams.

Spark streaming puede ingerir datos de diversas fuentes -tecnologías/frameworkds de streaming-: Apache Kafka (flujos de datos en stream), Apache Flume (especializado en archivos log), AWS Kinesis , sockets TCP. Los datos procesados por Spark Streaming se pueden enviar a sistemas de archivos distribuidos, bases de datos y/o dashboards.

*Fuente: Spark streaming guide

Lo que hace Spark Streaming es recibir un stream de datos y dividirlos en batches de datos que después puedan ser procesados por cualquier elemento del ecosistema de Spark.

*Fuente: Spark streaming guide

Hay dos elementos importantes en el módulo de Spark Streaming:

  1. DStream: Una abstracción que discretiza los streams para convertirlos en batches. Consiste de una secuencia de RDDs. Se pueden crear a partir de otro DStream, o de las diferentes fuentes de datos que manejan stream
  2. StreamingContext: El punto de entrada a todo la funcionalidad de streaming de Spark. Es context porque iteractúa directamente con los DStream (que son secuencias de RDD) y por lo tanto requeire del SparkContext para ser creado.

Conceptos en Spark Streaming

Inicialización del StreamingContext

\(\rightarrow\) para crear un StremaingContext debemos de obtener el sparkContext al que está asociado nuestro sparkSession de la siguiente manera:

#pyspark
from pyspark.streaming import StreamingContext

sc = spark.sparkContext
sc

# requerimos el sparkContext de nuestro sparkSession e indicarle
# el intervalo de tiempo para hacer cortes en el streaming y generar
# los batches de datos
ss = StreamingContext(sc, 1)
ss


Una vez que ya tenemos inicializado el StreamingContext tenemos que hacer lo siguiente:

  • Definir las fuentes de entrada al stremaing creando DStreams
  • Definir los procesamientos que haremos a los datos del stream aplicando transformaciones y operaciones sobre los DStreams
  • Empezar a recibir datos por streaming y procesarlos utilizando el streamingContext.start()
  • Esperar a que termine el procesamiento de los datos -manualmente o por error- utilizando streamingContext.awaitTermination()
  • Se puede detener el procesamiento manualmente utilizando streamingContext.stop()

  • \(\rightarrow\) Una vez que un cotexto se inicia no se puede configurar nuevos procesamientos o agregar a existentes
  • \(\rightarrow\) Una vez que el cotext se detiene no puede ser reiniciado
  • \(\rightarrow\) Solo se puede tener un StreamingContext activo por JVM al mismo tiempo (como el SparkSession)
  • \(\rightarrow\) Se puede ocupar un mismo SparkContext para crear diferentes StreamingContext siempre y cuando el StreamingContext anterior haya sido detenido (sin parar el SparkContext) antes de que el siguiente sea creado

Discretizar streams DStreams

Los DStream internamente representan una serie de RDDs -Resilent Distributed Datasets, recuerda que son inmutables-. Cada RDD en un DStream tiene una parte de los datos del intervalo del stream que se consumió. Cada operación que se realiza a un DStream se aplica a los RDDs que forman parte de ese DStream, ese es el beneficio más grande de los DStream, abstraen la implementación de aplicar cada operación que definimos a los RDD que lo contienen sin que nosotros nos tengamos que preocupar por lo que pasa por abajo, solo ocupamos el API de los DStream para aplicar operaciones y transformaciones y SparkStreaming se encarga de aplicarlos a todos los RDD correspondientes.


Input DStreams y Receivers

Un Input DStream es un DStream que representa los datos que vienen de una fuente de stream, cada input stream está asociado a un receiver -excepto los input stream que representan los datos de file streams-. La función de un receiver consiste en obtener los datos de la fuente de stream y guardarlos en Spark en memoria para procesarlos.

En Spark hay 2 tipos de fuentes de stream:

  1. Fuentes básicas: Las que ya vienen disponibles en el StreamingContext (sistemas de archivos y conexiones de sockets)

Paréntesis cultural: Un socket es una forma de comunicación entre 2 máquinas -normalmente cliente/servidor-, a través de este medio de comunicación se ocupan endpoints -direcciones ip específicas- con un puerto específico -nosotros lo podemos especificar- a través del cuál el cliente estará escuchando y enviando peticiones al server, y el server estará escuchando y enviando datos al cliente. Este tipo de comunicación ocupa el protocolo TCP (redes).


*Fuente: Oracle tutorials


*Fuente: Oracle tutorials

  1. Fuentes avanzadas: Las fuentes que vienen de tecnologías/frameworks de streaming: Kafka, Flume, Kinesis, etc. para integrar estas fuentes hay utilerías extra, y se requiere de hacer un mapeo -linking- de algunas dependencias.

Es posible tener varias fuentes de streaming al mismo tiempo -en paralelo-, para ello necesitamos tener suficientes cores (o hilos si corre localmente) corriendo para tener todos los DStream de entrada, los receivers y el procesamiento \(> \text{# de recievers}\)

Fuentes básicas
  • Sockets. Nos permite hacer streaming a través de sockets utilizando el método sockeTextStream(endpoint, port) del objeto stremaing context

  • Archivos (file streams). Nos permite hacer streaming de archivos alamacenados en DFS: HDFS, S3, NFS, etc. Para creara un DStream de este tipo se ocupa el método textFileStream(data_directory) del objeto streaming context. Todo archivo que viva en el directorio especificado será tomado como parte del stremaing, por ello todos los archivos bajo este directorio tienen que tener la misma estructura y formato interno (por el momento Spark no sabe manejar directorios anidados). \(\rightarrow\) Los archivos no pueden ser modificados, si se siguen agregando observaciones a los archivos -append- estos no serán procesados

  • Streams basados en receivers propios. Ver la guía de Custom receiver guide

  • Cola de RDDs como stream. Muy ocupado para probar una aplicación de Spark streaming a través de una serie de RDDs encolados, cada RDD es tomado como un batch. El método para esta funcionalidad es queueOfRDDs del objeto Streaming context

Fuentes avanzadas

Para este tipo de fuentes se require de ocupar librerías externas a Spark que requeiren de cumplir con dependencias, pero como arreglar todas las dependencias ha resultado en serios dolores de cabeza para la mayoría de íngenieros de datos, Spark ayudó generando librerías de Spark que hacen menos engorroso el mapeo de todas las dependencias, a esto se le llama linking para mapear las librerías de Spark de fuentes avanzadas a las tecnologías/frameworks que las ocupan: Kafka, Kinesis y Flume son las que en la versión de Spark 2.1 existen como implementación en pyspark.

Fuentes custom

Es posible crear un DStream para fuentes de streaming custom, para ello tenemos que implementar un user defined receiver que pueda recibir datos de la fuente custom y empujarlos a Spark. Ver Custom receiver guide

Es posible clasificar a los receivers de acuerdo a su nivel de confiabilidad -asegurarnos que no perdemos datos cuando estamos en el stream-. Los receivers que no envían una confirmación -acknowledge- de la recepción son considerados no confiables -unreliable receiver-, mientras que los que envían una confirmación de recepción son considerados confiables -reliable receiver-. La confirmación incluye enviar un mensaje a la fuente una vez que los datos han sido recibidos y guardados en Spark con replicación.

Ejemplo de streaming

Este ejemplo consiste en tener una fuente de streaming básica -un socket- a través del cuál enviaremos texto que será procesado después por Spark para contar el número de palabras que llegan a través del stream.

Ocuparemos el comando nc -lk 9999 en nuestro “server” para decirle que estaremos escuchando por el puerto 9999 (abrimos el socket en el puerto 9999), nc es un comando de netcat que es una utilería de Unix que se ocupa para leer y escribir datos a través de la red utilizando el protocolo TCP, el -lk es una bandera que indica que estaremos escuchando (l: listening) y que forzaremos a mantenernos escuchando aún cuando la conexión actual sea completada (-k: keep).

En nuesro cliente ejecutaremos nuestro script de python para recibir el streaming a través del endpoint localhost en el puerto 9999: ./bin/spark-submit ex_1.py localhost 9999

# word_count.py
import pprint

from pyspark import SparkContext
from pyspark.streaming import StreamingContext


def count_words():
    # Definimos el numero de cores que necesitamos, como este ejemplo corre local
    # ocupamos el local[n] que permite dedicar n numero de hilos al input DStream,
    # al receiver y al procesamiento.
    sc = SparkContext("local[2]")
    # definimos el intervalo de tiempo de stream (cada segundo generaremos)
    # batches de DStream
    streaming_context = StreamingContext(sc, 1)

    # Creamos un input DStream de tipo socket -basico- con el endpoint
    # localhost = 127.0.0.1 en el puerto 9999
    lines = streaming_context.socketTextStream("localhost", 9999)

    # de cada DStream que obtenemos queremos dividir en palabras para poder contarlas
    words = lines.flatMap(lambda line: line.split(" "))
    print(type(words))

    # Procesamos los DStream haciendo un conteo al estilo map/reduce
    pairs = words.map(lambda word: (word, 1))
    word_counts = pairs.reduceByKey(lambda x, y: x + y)

    # mostramos los primeros 10 elementos de cada RDD generado en el DStream en consola
    word_counts.pprint()

    #### Hasta este punto realmente no hemos ejecutado nada, solo hemos
    # configurado todo lo que necesitamos para recibir y procesar el stream de datos
    # para iniciar el procesamiento del streaming necesitamos hacer el llamada al
    # metodo start() del StreamingContext

    streaming_context.start()
    streaming_context.awaitTermination()


if __name__ == "__main__":
    count_words()

Transformaciones en los DStreams

  • map(func): Es el lapply de Spark, aplica una función a cada elemento del DStream, genera un nuevo DStream

  • flatMap(func): Es como el anterior solo que cada entrada puede ser mapeada a 0 o más registros de salida

  • filter(func): Como el filter de dplyr, genera un nuevo DStream con las observaciones qeu cumplen cierto(s) criterio(s)

  • repartition(numPartitions): Cambia el nivel de paralelismo en el DStream (específicamente en el que se está poniendo el repartiton) creando más o menos particiones

  • union(otherStream): Se genera un nuevo DStream con la unión de los elementos de 2 DStream

  • count(): Regresa un nuevo DStream que contiene el conteo del número de elementos en cada RDD que forma parte del DStream al que se le aplica el count

  • reduce(func): Regresa un nuevo DStream que agrega elementos de cada RDD en el DStream desde donde se aplica el reduce aplicando una función de agregación

  • countByValue(): Regresa un nuevo DStream con los conteos de frecuencia por llave que existen en todos los RDD del DStream al que se le aplica

  • reduceByKey(func, [numTasks]): Recibe un DStream con par al cual realiza una agregación, regresa un nuevo DStream con los pares agregados. numTasks tiene por default 2 cuando spark está instalado en modo local, de otra manera toma el valor que tenga el parámetro spark.default.parallelism

  • join(otherStream, [numTasks]): Aplica un full join a dos DStream \(\rightarrow\) recuerda que cada DStream es un stream de datos!.

  • cogroup(otherStream, [numTasks]): Se aplica a un part de DStream que tienen pares y para devolver un nuevo DStream con la tupla

  • transform(func): Aplica una función de transformación a todos los RDD de un DStream (genera otro RDD), es posible ocupar una transformación de las funciones que ya están implementadas en Spark o bien implementar una especia de UDF sobre transformación a realizar a los RDD que no existen a través del API.

  • updateStateByKey(func): Actualiza el estado de cada llave en el DStream aplicando una función al estado previo del DStream. Esta operación nos permite mantener un estado arbitrario mientras el estado actual se actualiza constantemente. Para ocupar este tipo de operación se requiere de lo siguiente:
    • Definir el estado (puede ser de un tipo de dato arbitrario)
    • Definir la función de actualización del estado. Se requiere de definir cómo actualizaremos el estado basándonos en el estado anterior y en los valores nuevos que vienen del stream.

Ejemplo: Utilizando el ejemplo de conteo de palabras agregaremos una función de actualización de estado, esta función será llamada por cada palabra donde los nuevos valores (newValues) son los 1 asociados a cada palabra y el runningCount corresponde al conteo “anterior”.

## definimos nuestra funcion de actualizacion
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count
    
#### aplicamos la funcion de actualizacion de estado a un DStream que trae pares (word, 1)
runningCounts = pairs.updateStateByKey(updateFunction)

Para ocupar updateStateByKey se requiere de ocupar checkpoint (lo veremos más adelante).

Operaciones en Ventanas

Las operaciones window permiten aplicar transformaciones sobre una ventana de datos. Cada vez que se mueve la ventana (en este ejemplo de 3) todos los RDDs que caen dentro de la ventana son combinados para aplicarles alguna transformación u operación, en este ejemplo las operaciones son aplicadas a las 3 últimas unidades de datos.


* Fuente Spark streaming guide

Para definir una operación de ventana se requieren de 2 parámetros:

  1. Definir la longitud de la ventana -window length-: La duración de la ventana
  2. Definir el intervalo de deslizamiento -slide interval-: El interval sobe el cúal la operación es ejecutada. En el ejemplo, el slide interval es de 2 unidades de tiempo.

\(\rightarrow\) La definición de ambos parámetros debe ser múltiplos del intervalo definido para el batch en el DSource

Operaciones típicas

  • window(windowLength, slideInterval): Genera un DStream basado en los batches que forman parte de la ventana
  • countByWindow(windowLength, slideInterval): Regresa el conteo de elementosque forman parte de la ventana y el intervalo de deslizamiento de un stream
  • reduceByWindow(func, windowLength, slideInterval): Genera un nuevo elemento de stream creado por agregar elementos del stream que forman parte de una ventana e intervalo de deslizamiento a través de una función de agregación \(\rightarrow\) la función debe poderse ejecutar en paralelo por lo que tiene que ser asociativa y conmutativa
  • reduceByKeyAndWindow(func, windowLength, slieInterval, [numTasks]): Igual que el reduceByKey solo que se aplica sobre una ventana indicando la longitud de la ventana y el intervalo de deslizamiento
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): Versión más eficiente que la anterior donde el reduce por ventana se calcula de forma incremental tomando en cuenta el reduce de la ventana anterior. Para que esto pueda suceder se requiere de tener una especie de inverse reducing sobre los datos viejos que salen de la ventana… normalmente implica poner la operación inversa a la ocupada en el reduce. Puede ser que no exista una función inversa a la ocupada en el reduce :( tonz no se puede aplicar esta función siempre.
  • countByValueAndWindow(windowLength, slideInterval, [numTasks]): Aqui el conteo se realiza por valor en la ventana -obtenemos la frecuencia por llave-. Los Dstream que forman parte de la ventana son pares , el resultado de la operación será .

Hagamos un ejemplo para entender cómo funcionan las operaciones en ventanas, modificaremos el ejemplo visto para contar palabras de los últimos 30 segundos de datos cada 10 segundos. Para poder hacer esto tendremos que aplicar un reduceByKey sobre los pares de DStream (word, 1) que se tengan de los últimos 30 segundos, esto lo haremos a través de la operación reduceByKeyAndWindow.

# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
Join
  • stream-stream join: Es posible hacer full inner join join, rightOuterJoin, leftOuterJoin y fullOuterJoin
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
  • También podemos hacer join entre operaciones window
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
  • stream-dataset join:
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))

Outputs de los DStreams

Los DStreams creados puedes ser “empujados” a otros sistemas/frameworks externos como un DFS, una base de datos, etc. Las funciones de salida que existen para los DStreams son:

  • print(): \(\rightarrow\) pprint en python, imprime los primeros 10 elementos del cada batch de datos en un DStream (implica bajar a drive -el nodo donde se está corriendo la aplicación de streaming-)
  • saveAsTextFiles(prefix, [suffix]): Permite guardar el contenido de este DStream como un archivo de texto
  • saveAsObjectFiles(prefix, [suffix]): Guarda el contenido del DStream como una secuencia de archivos u objetos serializados en Java \(\rightarrow\) No está disponible para el API de Python
  • saveAsHadoopFiles(prefix, [suffix]): Guarda el contenido del Dstream como archivos de Hadoop. \(\rightarrow\) No está disponible para el API de Python
  • foreachRDD(func): Es la función más genérica para guardar datos, esta función permite que nosotros implementemos nuestra lógica para guardar datos a un sistema externo aplicando esta función a cada RDD dentro del DStream. Esta función es ejecutada en el driver en donde se ejecuta la aplicación de streaming que afectan el performance de los RDD que están llegando a través del stream.

Para ocupar esta funcinó de manera eficiente se recomienda:

  • Crear la conexión TCP que se ocupará para enviar los datos que se desean guardar desde el worker de Spark donde se planea guardar los datos no en el driver donde se encuentra la aplicación de Spark streaming.

Don´t

def sendRecord(rdd):
    connection = createNewConnection()  # executed at the driver
    rdd.foreach(lambda record: connection.send(record))
    connection.close()

dstream.foreachRDD(sendRecord)

En el ejemplo anterior haremos que la conexión creada desde el driver tenga que ser serializada para ser enviada al worker esto no es la forma en la que se crean conexiones … generará un error de serialización y/o inicialización.

  • Una vez que la conexión se genera desde el worker revisar que no se genere una conexión por cada registro! Crear una conexión requiere de algunos recursos computacionales, si creamos y destruimos una conexión por cada registro generaremos un overhead innecesario que afectará el desempeño del sistema. Ocupar rdd.foreachPartition para crear una conexión por RDD

Don’t

def sendRecord(record):
    connection = createNewConnection()
    connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))

Do

def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
  • Se pueden reutilizar conexiones ya creadas ocupando un pool de conexiones.
def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
  • Los DStream son ejecutados de forma lazy a través de las operaciones de salida, los RDD también se ejecutan lazy hasta que se hagan acciones sobre el RDD. Como los DStream contienen RDD, las acciones al RDD dentro de las operaciones de salida son las que forzan a la ejecución/procesamiento de los datos que se reciben en el stream, si nuestra aplicación no tiene operaciones de salida o tiene operaciones de salida del tipo dstream.foreachRDD() sin acciones dentro de los RDD que forman parte del stream nada será ejecutado, solo recibiremos datos y los tiraremos.

  • Las operaciones de salida se ejecutan una a la vez en el orden definido en la aplicacinó de streaming

Operaciones sobre DataFrame SQL

Podemos ocupar DataFrame SQL y todas sus funciones con datos que vienen del stream para ello necesitamos crear un SparkSession del SparkContext que está ocupando el StreamingContext.


# word_count.py
import pprint

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession

def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

# Convertimos los RDD en el DStream a DataFrame para ejecutar alguna funcion de SQL
def process(time, rdd):
    print("========= %s =========" % str(time))

    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        row_rdd = rdd.map(lambda w: Row(word=w))
        words_data_frame = spark.createDataFrame(row_rdd)

        # Creates a temporary view using the DataFrame.
        words_data_frame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        word_counts_data_frame = \
            spark.sql("select word, count(*) as total from words group by word")
        word_counts_data_frame.show()
    except:
        pass


def count_words():
    # Definimos el numero de cores que necesitamos, como este ejemplo corre local
    # ocupamos el local[n] que permite dedicar n numero de hilos al input DStream,
    # al receiver y al procesamiento.
    sc = SparkContext("local[2]")
    # definimos el intervalo de tiempo de stream (cada segundo generaremos)
    # batches de DStream
    streaming_context = StreamingContext(sc, 1)

    # Creamos un input DStream de tipo socket -basico- con el endpoint
    # localhost = 127.0.0.1 en el puerto 9999
    lines = streaming_context.socketTextStream("localhost", 9999)

    # de cada DStream que obtenemos queremos dividir en palabras para poder contarlas
    words = lines.flatMap(lambda line: line.split(" "))
    print(type(words))

    # Procesamos los DStream haciendo un conteo al estilo map/reduce
    pairs = words.map(lambda word: (word, 1))
    word_counts = pairs.reduceByKey(lambda x, y: x + y)

    # mostramos los primeros 10 elementos de cada RDD generado en el DStream en consola
    word_counts.pprint()

    #### Hasta este punto realmente no hemos ejecutado nada, solo hemos
    # configurado todo lo que necesitamos para recibir y procesar el stream de datos
    # para iniciar el procesamiento del streaming necesitamos hacer el llamada al
    # metodo start() del StreamingContext
    
    words.foreachRDD(process)
    streaming_context.start()
    streaming_context.awaitTermination()


if __name__ == "__main__":
    count_words()

Operaciones ML

Existen algoritmos específicos en el API de ml de spark para ser ocupados con streaming: Streaming Linear Regression, Streaming KMeans, aunque estos viven en la librería prohibida de mllib (╥﹏╥), también es posible que ocupar los modelos “tradicionales” entrenados en batch pero implementados para recibir datos de stream para pedicciones… aunque de nuevo este puente solo está implementado para la librería de mllib (╥﹏╥)

Cache y persistencia

Los DStream (como los RDD) se pueden guardar en memoria utilizando persist() lo que hará que se guarden los RDD del DStream en memoria, esta estrategia se ocupa si los datos que están en el DStream se procesarán varias veces (varias operaciones diferentes sobre los mismos datos). Los DStream que se generan de operaciones basadas en ventanas son implicitamente guardados en memoria por lo que ya no es necesario ocupar el persist.

Para los datos que vienen de fuentes avanzadas la persistencia por default está asociada a tener un factor de replicación de 2, la persistencia por default de los DStream consiste en mantener serializado los datos en memoria -lo que puede afectar el desempeño de nuestra aplicación de streaming-

Checkpointing

Una aplicación de stream debe operar 24/7 por lo que debe ser resilente a fallas no relacionadas a la lógica de la aplicación, para cumplir con esta propiedad Spark streaming requiere de hacer checkpoints a un sistema de almacenamiento tolerante a fallas con información suficiente para recuperarse de la falla. Existen 2 tipos de checkpoint en Spark Streaming:

  1. Metadata checkpointing: Se almacena información de nuestra aplicación de streaming en un DFS: HDFS o S3 -normalmente S3-
    • Configuración: La configuración utilizada para crear la aplicación de streaming
    • Operaciones de los DStream: El conjunto de operaciones que se aplican a los DStream como lógica de nuestra aplicación de streaming
    • Batches incompletos: Batches que fueron encolados para ser procesados pero no fueron completados
  2. Data checkpointing: Almacenamos los RDD generados a un DFS, los RDD son los intermedios durante todo el procesamiento, pero se guardan con todo su estado de manera periódica (no se guarda tooooodo el procesamiento y cada paso sobre los RDD)

Se requiere de utilizar checkpointing cuando:

  • Ocupamos updateStateByKey o reduceByKeyAndWindow con inverse function \(\rightarrow\) se debe proveer de un directorio para dejar el checkpoint
  • Queremos recuperarnos de fallas en el driver que corre la aplicación

Muchas de las aplicaciones de Spark Streaming no ocupan checkpoints y eso esta bien! :P